-
Notifications
You must be signed in to change notification settings - Fork 99
MINIFICPP-2603 Add Record Reader and Record Writer properties to MQTT processors #2004
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds Record Reader and Record Writer properties to the MQTT processors (PublishMQTT and ConsumeMQTT) to enable structured record processing. Instead of treating MQTT messages as raw payloads, these processors can now parse messages into records using a Record Reader and serialize them back using a Record Writer.
- Adds Record Reader and Record Writer properties to both PublishMQTT and ConsumeMQTT processors
- Implements record-based processing logic that splits individual records for publishing and aggregates records for consuming
- Includes comprehensive test coverage for the new functionality
Reviewed Changes
Copilot reviewed 22 out of 22 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| minifi-api/include/minifi-cpp/core/Record.h | Adds constructor and contains() method to Record class for better record manipulation |
| extensions/standard-processors/processors/SplitRecord.cpp | Refactors to use common utility function for getting RecordSetIO services |
| extensions/mqtt/processors/PublishMQTT.h | Adds Record Reader/Writer properties and makes sendMessage method virtual for testing |
| extensions/mqtt/processors/PublishMQTT.cpp | Implements record processing logic that splits records into individual flow files |
| extensions/mqtt/processors/ConsumeMQTT.h | Adds Record Reader/Writer properties and Add Attributes As Fields option |
| extensions/mqtt/processors/ConsumeMQTT.cpp | Implements record aggregation logic and attribute handling for record mode |
| extensions/mqtt/processors/AbstractMQTTProcessor.h | Adds record set reader/writer member variables and makes initializeClient virtual |
| extension-utils/include/utils/ProcessorConfigUtils.h | Adds utility function for getting and validating RecordSetIO services |
| extensions/mqtt/tests/PublishMQTTTests.cpp | Comprehensive test coverage for record-based publishing functionality |
| extensions/mqtt/tests/ConsumeMQTTTests.cpp | Comprehensive test coverage for record-based consuming functionality |
| docker/test/integration/ | Integration test setup and scenarios for MQTT record processing |
| PROCESSORS.md | Documentation updates for new properties and functionality |
Comments suppressed due to low confidence (1)
extensions/mqtt/tests/ConsumeMQTTTests.cpp:151
- Spelling error: 'non-existant' should be 'non-existent'.
REQUIRE_NOTHROW(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_));
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
You can also share your feedback on Copilot code review for a chance to win a $100 gift card. Take the survey.
27903b9 to
b7b0564
Compare
97ba156 to
2aae8e8
Compare
b7b0564 to
2d9cdcd
Compare
2aae8e8 to
a5d42f6
Compare
2d9cdcd to
e230c2c
Compare
a5d42f6 to
fa66a5d
Compare
e230c2c to
d070bdc
Compare
33a23ce to
295c2b7
Compare
295c2b7 to
14f9f1a
Compare
14f9f1a to
071704c
Compare
071704c to
12ae77b
Compare
e05ba80 to
034d4d1
Compare
bf24a9e to
06aedaf
Compare
… processors Signed-off-by: Ferenc Gerlits <[email protected]> Closes apache#2004
https://issues.apache.org/jira/browse/MINIFICPP-2603
Depends on #1997
Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
Has your PR been rebased against the latest commit within the target branch (typically main)?
Is your initial contribution a single, squashed commit?
For code changes:
For documentation related changes:
Note:
Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.